/*
 * Copyright (c) 2016 MariaDB Corporation Ab
 *
 * Use of this software is governed by the Business Source License included
 * in the LICENSE.TXT file and at www.mariadb.com/bsl.
 *
 * Change Date: 2019-01-01
 *
 * On the date above, in accordance with the Business Source License, use
 * of this software will be governed by version 2 or later of the General
 * Public License.
 */

#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <ini.h>
#include <stdint.h>
#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>
#include <mysql.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>

typedef struct delivery_t
{
    uint64_t dtag;
    amqp_message_t* message;
    struct delivery_t *next, *prev;
} DELIVERY;

typedef struct consumer_t
{
    char *hostname, *vhost, *user, *passwd, *queue, *dbserver, *dbname, *dbuser, *dbpasswd;
    DELIVERY* query_stack;
    int port, dbport;
} CONSUMER;

static int all_ok;
static FILE* out_fd;
static CONSUMER* c_inst;
static char* DB_DATABASE = "CREATE DATABASE IF NOT EXISTS %s;";
static char* DB_TABLE =
    "CREATE TABLE IF NOT EXISTS pairs (tag VARCHAR(64) PRIMARY KEY NOT NULL, query VARCHAR(2048), reply VARCHAR(2048), date_in DATETIME NOT NULL, date_out DATETIME DEFAULT NULL, counter INT DEFAULT 1)";
static char* DB_INSERT = "INSERT INTO pairs(tag, query, date_in) VALUES ('%s','%s',FROM_UNIXTIME(%s))";
static char* DB_UPDATE = "UPDATE pairs SET reply='%s', date_out=FROM_UNIXTIME(%s) WHERE tag='%s'";
static char* DB_INCREMENT =
    "UPDATE pairs SET counter = counter+1, date_out=FROM_UNIXTIME(%s) WHERE query='%s'";

void sighndl(int signum)
{
    if (signum == SIGINT)
    {
        all_ok = 0;
        alarm(1);
    }
}

int handler(void* user, const char* section, const char* name,
            const char* value)
{
    if (strcmp(section, "consumer") == 0)
    {

        if (strcmp(name, "hostname") == 0)
        {
            c_inst->hostname = strdup(value);
        }
        else if (strcmp(name, "vhost") == 0)
        {
            c_inst->vhost = strdup(value);
        }
        else if (strcmp(name, "port") == 0)
        {
            c_inst->port = atoi(value);
        }
        else if (strcmp(name, "user") == 0)
        {
            c_inst->user = strdup(value);
        }
        else if (strcmp(name, "passwd") == 0)
        {
            c_inst->passwd = strdup(value);
        }
        else if (strcmp(name, "queue") == 0)
        {
            c_inst->queue = strdup(value);
        }
        else if (strcmp(name, "dbserver") == 0)
        {
            c_inst->dbserver = strdup(value);
        }
        else if (strcmp(name, "dbport") == 0)
        {
            c_inst->dbport = atoi(value);
        }
        else if (strcmp(name, "dbname") == 0)
        {
            c_inst->dbname = strdup(value);
        }
        else if (strcmp(name, "dbuser") == 0)
        {
            c_inst->dbuser = strdup(value);
        }
        else if (strcmp(name, "dbpasswd") == 0)
        {
            c_inst->dbpasswd = strdup(value);
        }
        else if (strcmp(name, "logfile") == 0)
        {
            out_fd = fopen(value, "ab");
        }

    }

    return 1;
}

int isPair(amqp_message_t* a, amqp_message_t* b)
{
    int keylen = a->properties.correlation_id.len >=
                 b->properties.correlation_id.len ?
                 a->properties.correlation_id.len :
                 b->properties.correlation_id.len;

    return strncmp(a->properties.correlation_id.bytes,
                   b->properties.correlation_id.bytes,
                   keylen) == 0 ? 1 : 0;
}

int connectToServer(MYSQL* server)
{


    mysql_init(server);

    mysql_options(server, MYSQL_READ_DEFAULT_GROUP, "client");
    mysql_options(server, MYSQL_OPT_USE_REMOTE_CONNECTION, 0);
    my_bool tr = 1;
    mysql_options(server, MYSQL_OPT_RECONNECT, &tr);


    MYSQL* result =  mysql_real_connect(server,
                                        c_inst->dbserver,
                                        c_inst->dbuser,
                                        c_inst->dbpasswd,
                                        NULL,
                                        c_inst->dbport,
                                        NULL,
                                        0);


    if (result == NULL)
    {
        fprintf(out_fd, "\33[31;1mError\33[0m: Could not connect to MySQL server: %s\n", mysql_error(server));
        return 0;
    }

    int bsz = 1024;
    char *qstr = calloc(bsz, sizeof(char));


    if (!qstr)
    {
        fprintf(stderr, "Fatal Error: Cannot allocate enough memory.\n");
        return 0;
    }


    /**Connection ok, check that the database and table exist*/

    memset(qstr, 0, bsz);
    sprintf(qstr, DB_DATABASE, c_inst->dbname);
    if (mysql_query(server, qstr))
    {
        fprintf(stderr, "\33[31;1mError\33[0m: Could not send query MySQL server: %s\n", mysql_error(server));
    }
    memset(qstr, 0, bsz);
    sprintf(qstr, "USE %s;", c_inst->dbname);
    if (mysql_query(server, qstr))
    {
        fprintf(stderr, "\33[31;1mError\33[0m: Could not send query MySQL server: %s\n", mysql_error(server));
    }

    memset(qstr, 0, bsz);
    sprintf(qstr, "%s", DB_TABLE);
    if (mysql_query(server, qstr))
    {
        fprintf(stderr, "\33[31;1mError\33[0m: Could not send query MySQL server: %s\n", mysql_error(server));
    }

    free(qstr);
    return 1;
}

int sendMessage(MYSQL* server, amqp_message_t* msg)
{
    int buffsz = (int)((msg->body.len + 1) * 2 + 1) +
                 (int)((msg->properties.correlation_id.len + 1) * 2 + 1) +
                 strlen(DB_INSERT),
                 rval = 0;
    char* saved;
    char *qstr = calloc(buffsz, sizeof(char)),
          *rawmsg = calloc((msg->body.len + 1), sizeof(char)),
           *clnmsg = calloc(((msg->body.len + 1) * 2 + 1), sizeof(char)),
            *rawdate = calloc((msg->body.len + 1), sizeof(char)),
             *clndate = calloc(((msg->body.len + 1) * 2 + 1), sizeof(char)),
              *rawtag = calloc((msg->properties.correlation_id.len + 1), sizeof(char)),
               *clntag = calloc(((msg->properties.correlation_id.len + 1) * 2 + 1), sizeof(char));



    sprintf(qstr, "%.*s", (int)msg->body.len, (char *)msg->body.bytes);
    fprintf(out_fd, "Received: %s\n", qstr);
    char *ptr = strtok_r(qstr, "|", &saved);
    sprintf(rawdate, "%s", ptr);
    ptr = strtok_r(NULL, "\n\0", &saved);
    if (ptr == NULL)
    {
        fprintf(out_fd, "Message content not valid.\n");
        rval = 1;
        goto cleanup;
    }
    sprintf(rawmsg, "%s", ptr);
    sprintf(rawtag, "%.*s", (int)msg->properties.correlation_id.len,
            (char *)msg->properties.correlation_id.bytes);
    memset(qstr, 0, buffsz);

    mysql_real_escape_string(server, clnmsg, rawmsg, strnlen(rawmsg, msg->body.len + 1));
    mysql_real_escape_string(server, clndate, rawdate, strnlen(rawdate, msg->body.len + 1));
    mysql_real_escape_string(server, clntag, rawtag, strnlen(rawtag, msg->properties.correlation_id.len + 1));

    if (strncmp(msg->properties.message_id.bytes,
                "query", msg->properties.message_id.len) == 0)
    {

        sprintf(qstr, DB_INCREMENT, clndate, clnmsg);
        rval = mysql_query(server, qstr);

        if (mysql_affected_rows(server) == 0)
        {
            memset(qstr, 0, buffsz);
            sprintf(qstr, DB_INSERT, clntag, clnmsg, clndate);
            rval = mysql_query(server, qstr);
        }

    }
    else if (strncmp(msg->properties.message_id.bytes,
                     "reply", msg->properties.message_id.len) == 0)
    {

        sprintf(qstr, DB_UPDATE, clnmsg, clndate, clntag);
        rval = mysql_query(server, qstr);

    }
    else
    {
        rval = 1;
        goto cleanup;
    }


    if (rval)
    {
        fprintf(stderr, "Could not send query to SQL server:%s\n", mysql_error(server));
        goto cleanup;
    }

cleanup:
    free(qstr);
    free(rawmsg);
    free(clnmsg);
    free(rawdate);
    free(clndate);
    free(rawtag);
    free(clntag);

    return rval;
}

int sendToServer(MYSQL* server, amqp_message_t* a, amqp_message_t* b)
{

    amqp_message_t *msg, *reply;
    int buffsz = 2048;
    char *qstr = calloc(buffsz, sizeof(char));

    if (!qstr)
    {
        fprintf(out_fd, "Fatal Error: Cannot allocate enough memory.\n");
        free(qstr);
        return 0;
    }

    if ( a->properties.message_id.len == strlen("query") &&
         strncmp(a->properties.message_id.bytes, "query",
                 a->properties.message_id.len) == 0)
    {

        msg = a;
        reply = b;

    }
    else
    {

        msg = b;
        reply = a;

    }


    printf("pair: %.*s\nquery: %.*s\nreply: %.*s\n",
           (int)msg->properties.correlation_id.len,
           (char *)msg->properties.correlation_id.bytes,
           (int)msg->body.len,
           (char *)msg->body.bytes,
           (int)reply->body.len,
           (char *)reply->body.bytes);

    if ((int)msg->body.len +
        (int)reply->body.len +
        (int)msg->properties.correlation_id.len + 50 >= buffsz)
    {
        char *qtmp = calloc(buffsz * 2, sizeof(char));
        free(qstr);

        if (qtmp)
        {
            qstr = qtmp;
            buffsz *= 2;
        }
        else
        {
            fprintf(stderr, "Fatal Error: Cannot allocate enough memory.\n");
            return 0;
        }

    }

    char *rawmsg = calloc((msg->body.len + 1), sizeof(char)),
          *clnmsg = calloc(((msg->body.len + 1) * 2 + 1), sizeof(char)),
           *rawrpl = calloc((reply->body.len + 1), sizeof(char)),
            *clnrpl = calloc(((reply->body.len + 1) * 2 + 1), sizeof(char)),
             *rawtag = calloc((msg->properties.correlation_id.len + 1), sizeof(char)),
              *clntag = calloc(((msg->properties.correlation_id.len + 1) * 2 + 1), sizeof(char));

    sprintf(rawmsg, "%.*s", (int)msg->body.len, (char *)msg->body.bytes);
    sprintf(rawrpl, "%.*s", (int)reply->body.len, (char *)reply->body.bytes);
    sprintf(rawtag, "%.*s", (int)msg->properties.correlation_id.len,
            (char *)msg->properties.correlation_id.bytes);

    char *ptr;
    while ((ptr = strchr(rawmsg, '\n')))
    {
        *ptr = ' ';
    }
    while ((ptr = strchr(rawrpl, '\n')))
    {
        *ptr = ' ';
    }
    while ((ptr = strchr(rawtag, '\n')))
    {
        *ptr = ' ';
    }

    mysql_real_escape_string(server, clnmsg, rawmsg, strnlen(rawmsg, msg->body.len + 1));
    mysql_real_escape_string(server, clnrpl, rawrpl, strnlen(rawrpl, reply->body.len + 1));
    mysql_real_escape_string(server, clntag, rawtag, strnlen(rawtag, msg->properties.correlation_id.len + 1));



    sprintf(qstr, "INSERT INTO pairs VALUES ('%s','%s','%s');", clnmsg, clnrpl, clntag);
    free(rawmsg);
    free(clnmsg);
    free(rawrpl);
    free(clnrpl);
    free(rawtag);
    free(clntag);

    if (mysql_query(server, qstr))
    {
        fprintf(stderr, "Could not send query to SQL server:%s\n", mysql_error(server));
        free(qstr);
        return 0;
    }

    free(qstr);
    return 1;
}
int main(int argc, char** argv)
{
    int channel = 1, status = AMQP_STATUS_OK, cnfnlen;
    amqp_socket_t *socket = NULL;
    amqp_connection_state_t conn;
    amqp_rpc_reply_t ret;
    amqp_message_t *reply = NULL;
    amqp_frame_t frame;
    struct timeval timeout;
    MYSQL db_inst;
    char ch, *cnfname = NULL, *cnfpath = NULL;
    static const char* fname = "consumer.cnf";
    const char* default_path = "@CMAKE_INSTALL_PREFIX@/etc";

    if ((c_inst = calloc(1, sizeof(CONSUMER))) == NULL)
    {
        fprintf(stderr, "Fatal Error: Cannot allocate enough memory.\n");
        return 1;
    }

    if (signal(SIGINT, sighndl) == SIG_IGN)
    {
        signal(SIGINT, SIG_IGN);
    }

    while ((ch = getopt(argc, argv, "c:")) != -1)
    {
        switch (ch)
        {
            case 'c':
                cnfnlen = strlen(optarg);
                cnfpath = strdup(optarg);
                break;
            default:

                break;
        }
    }

    if (cnfpath == NULL)
    {
        cnfpath = strdup(default_path);
        cnfnlen = strlen(default_path);
    }

    cnfname = calloc(cnfnlen + strlen(fname) + 1, sizeof(char));

    if (cnfpath)
    {

        /**Config file path as argument*/
        strcpy(cnfname, cnfpath);
        if (cnfpath[cnfnlen - 1] != '/')
        {
            strcat(cnfname, "/");
        }

    }

    strcat(cnfname, fname);

    timeout.tv_sec = 1;
    timeout.tv_usec = 0;
    all_ok = 1;
    out_fd = NULL;



    /**Parse the INI file*/
    if (ini_parse(cnfname, handler, NULL) < 0)
    {

        /**Try to parse a config in the same directory*/
        if (ini_parse(fname, handler, NULL) < 0)
        {
            fprintf(stderr, "Fatal Error: Error parsing configuration file!\n");
            goto fatal_error;

        }
    }

    if (out_fd == NULL)
    {
        out_fd = stdout;
    }

    fprintf(out_fd, "\n--------------------------------------------------------------\n");

    /**Confirm that all parameters were in the configuration file*/
    if (!c_inst->hostname || !c_inst->vhost || !c_inst->user ||
        !c_inst->passwd || !c_inst->dbpasswd || !c_inst->queue ||
        !c_inst->dbserver || !c_inst->dbname || !c_inst->dbuser)
    {
        fprintf(stderr, "Fatal Error: Inadequate configuration file!\n");
        goto fatal_error;
    }

    connectToServer(&db_inst);

    if ((conn = amqp_new_connection()) == NULL ||
        (socket = amqp_tcp_socket_new(conn)) == NULL)
    {
        fprintf(stderr, "Fatal Error: Cannot create connection object or socket.\n");
        goto fatal_error;
    }

    if (amqp_socket_open(socket, c_inst->hostname, c_inst->port))
    {
        fprintf(stderr, "\33[31;1mRabbitMQ Error\33[0m: Cannot open socket.\n");
        goto error;
    }

    ret = amqp_login(conn, c_inst->vhost, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, c_inst->user, c_inst->passwd);

    if (ret.reply_type != AMQP_RESPONSE_NORMAL)
    {
        fprintf(stderr, "\33[31;1mRabbitMQ Error\33[0m: Cannot login to server.\n");
        goto error;
    }

    amqp_channel_open(conn, channel);
    ret = amqp_get_rpc_reply(conn);

    if (ret.reply_type != AMQP_RESPONSE_NORMAL)
    {
        fprintf(stderr, "\33[31;1mRabbitMQ Error\33[0m: Cannot open channel.\n");
        goto error;
    }

    reply = malloc(sizeof(amqp_message_t));
    if (!reply)
    {
        fprintf(stderr, "Error: Cannot allocate enough memory.\n");
        goto error;
    }
    amqp_basic_consume(conn, channel, amqp_cstring_bytes(c_inst->queue), amqp_empty_bytes, 0, 0, 0,
                       amqp_empty_table);

    while (all_ok)
    {

        status = amqp_simple_wait_frame_noblock(conn, &frame, &timeout);

        /**No frames to read from server, possibly out of messages*/
        if (status == AMQP_STATUS_TIMEOUT)
        {
            sleep(timeout.tv_sec);
            continue;
        }

        if (frame.payload.method.id == AMQP_BASIC_DELIVER_METHOD)
        {

            amqp_basic_deliver_t* decoded = (amqp_basic_deliver_t*)frame.payload.method.decoded;

            amqp_read_message(conn, channel, reply, 0);

            if (sendMessage(&db_inst, reply))
            {

                fprintf(stderr, "\33[31;1mRabbitMQ Error\33[0m: Received malformed message.\n");
                amqp_basic_reject(conn, channel, decoded->delivery_tag, 0);
                amqp_destroy_message(reply);

            }
            else
            {

                amqp_basic_ack(conn, channel, decoded->delivery_tag, 0);
                amqp_destroy_message(reply);

            }

        }
        else
        {
            fprintf(stderr, "\33[31;1mRabbitMQ Error\33[0m: Received method from server: %s\n",
                    amqp_method_name(frame.payload.method.id));
            all_ok = 0;
            goto error;
        }

    }

    fprintf(out_fd, "Shutting down...\n");
error:

    mysql_close(&db_inst);
    mysql_library_end();
    if (c_inst && c_inst->query_stack)
    {

        while (c_inst->query_stack)
        {
            DELIVERY* d = c_inst->query_stack->next;
            amqp_destroy_message(c_inst->query_stack->message);
            free(c_inst->query_stack);
            c_inst->query_stack = d;
        }

    }

    amqp_channel_close(conn, channel, AMQP_REPLY_SUCCESS);
    amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
    amqp_destroy_connection(conn);
fatal_error:

    if (out_fd)
    {
        fclose(out_fd);
    }


    if (c_inst)
    {

        free(c_inst->hostname);
        free(c_inst->vhost);
        free(c_inst->user);
        free(c_inst->passwd);
        free(c_inst->queue);
        free(c_inst->dbserver);
        free(c_inst->dbname);
        free(c_inst->dbuser);
        free(c_inst->dbpasswd);
        free(c_inst);

    }



    return all_ok;
}
